ea5f7983c73c3de59bc2376ba809c714c4811fc7,debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java,BinlogReader,handleUpdate,#Event#,465
Before Change
Serializable[] after = changes.getValue();
count += recordMaker.update(before, after, ts, row, numRows);
}
logger.debug("Recorded {} update records for event: {}", count, event);
} else {
logger.debug("Skipping update row event: {}", event);
}
After Change
* @throws InterruptedException if this thread is interrupted while blocking
*/
protected void handleUpdate(Event event) throws InterruptedException {
if (skipEvent) {
// We can skip this because we should already be at least this far ...
logger.debug("Skipping previously processed row event: {}", event);
return;
}
UpdateRowsEventData update = unwrapData(event);
long tableNumber = update.getTableId();
BitSet includedColumns = update.getIncludedColumns();
// BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();
RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);
if (recordMaker != null) {
List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
Long ts = context.clock().currentTimeInMillis();
int count = 0;
int numRows = rows.size();
if (startingRowNumber < numRows) {
for (int row = startingRowNumber; row != numRows; ++row) {
Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
Serializable[] before = changes.getKey();
Serializable[] after = changes.getValue();
count += recordMaker.update(before, after, ts, row, numRows);
}
if (logger.isDebugEnabled()) {
if (startingRowNumber != 0) {
logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}",
count, numRows - startingRowNumber, event);
} else {
logger.debug("Recorded {} update record(s) for event: {}", count, event);
}
}
} else {
// All rows were previously processed ...
logger.debug("Skipping previously processed update event: {}", event);
}
} else {
logger.debug("Skipping update row event: {}", event);